26.1 子Agent概述

15 分钟阅读

子Agent概述#

子Agent是Claude Code中的一个重要概念,它允许主Agent将复杂任务分解为多个子任务,并委托给专门的子Agent来处理。通过子Agent机制,Claude Code可以更高效地处理复杂的编程任务,提高任务完成的质量和速度。

子Agent的概念#

1. 什么是子Agent#

子Agent是主Agent的辅助实体,具有以下特点:

  • 专业化: 每个子Agent专注于特定的任务领域
  • 独立性: 子Agent可以独立完成任务,不依赖主Agent的持续干预
  • 协作性: 多个子Agent可以协同工作,完成复杂的任务
  • 可复用性: 子Agent可以在不同的任务中被重复使用

2. 子Agent vs 主Agent#

特性主Agent子Agent
职责协调和决策执行特定任务
交互与用户直接交互与主Agent交互
范围全局视角局部视角
自主性中等
专业化程度通用专用

子Agent的类型#

1. 代码生成子Agent#

示例:代码生成子Agent

用户请求: "创建一个用户认证系统的代码生成子Agent"

Claude Code 生成的子Agent:

python
````python class CodeGenerationAgent: """代码生成子Agent""" def __init__(self, context): self.context = context self.templates = {} self.best_practices = {} def generate_auth_system(self, requirements): """生成认证系统代码""" # 分析需求 analysis = self._analyze_requirements(requirements) # 选择模板 template = self._select_template(analysis) # 生成代码 code = self._generate_code(template, analysis) # 应用最佳实践 code = self._apply_best_practices(code) # 验证代码 validation = self._validate_code(code) return { 'code': code, 'validation': validation, 'documentation': self._generate_documentation(code) } def _analyze_requirements(self, requirements): """分析需求""" analysis = { 'auth_methods': [], 'security_level': 'medium', 'features': [] } if 'jwt' in requirements: analysis['auth_methods'].append('jwt') if 'oauth' in requirements: analysis['auth_methods'].append('oauth') if '2fa' in requirements: analysis['features'].append('2fa') if 'session' in requirements: analysis['auth_methods'].append('session') return analysis def _select_template(self, analysis): """选择模板""" if 'jwt' in analysis['auth_methods']: return 'jwt_auth_template' elif 'oauth' in analysis['auth_methods']: return 'oauth_auth_template' else: return 'basic_auth_template' def _generate_code(self, template, analysis): """生成代码""" if template == 'jwt_auth_template': return self._generate_jwt_auth(analysis) elif template == 'oauth_auth_template': return self._generate_oauth_auth(analysis) else: return self._generate_basic_auth(analysis) def _generate_jwt_auth(self, analysis): """生成JWT认证代码""" code = ''' from flask import Flask, request, jsonify from flask_jwt_extended import ( JWTManager, create_access_token, jwt_required, get_jwt_identity ) from datetime import timedelta import bcrypt app = Flask(__name__) app.config['JWT_SECRET_KEY'] = 'your-secret-key' app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=24) jwt = JWTManager(app) class AuthService: """认证服务""" def __init__(self, db): self.db = db def register(self, username, password): """用户注册""" # 检查用户名是否存在 if self._user_exists(username): raise ValueError("Username already exists") # 哈希密码 password_hash = bcrypt.hashpw( password.encode('utf-8'), bcrypt.gensalt() ).decode('utf-8') # 创建用户 user = User(username=username, password_hash=password_hash) self.db.session.add(user) self.db.session.commit() return user def login(self, username, password): """用户登录""" # 查找用户 user = User.query.filter_by(username=username).first() if not user: raise ValueError("Invalid username or password") # 验证密码 if not bcrypt.checkpw( password.encode('utf-8'), user.password_hash.encode('utf-8') ): raise ValueError("Invalid username or password") # 生成访问令牌 access_token = create_access_token(identity=user.id) return { 'access_token': access_token, 'user': { 'id': user.id, 'username': user.username } } def _user_exists(self, username):

"""检查用户名是否存在""" return User.query.filter_by(username=username).first() is not None ''' return code

def _apply_best_practices(self, code): """应用最佳实践"""

添加错误处理

code = self._add_error_handling(code)

添加日志记录

code = self._add_logging(code)

添加输入验证

code = self._add_input_validation(code)

return code

def _validate_code(self, code): """验证代码""" validation = { 'syntax': True, 'security': True, 'best_practices': True, 'issues': [] }

检查语法

try: compile(code, '<string>', 'exec') except SyntaxError as e: validation['syntax'] = False validation['issues'].append(f"Syntax error: {e}")

检查安全性

if 'SECRET_KEY' in code and 'your-secret-key' in code: validation['security'] = False validation['issues'].append("Default secret key detected")

return validation

def _generate_documentation(self, code): """生成文档""" return { 'description': 'JWT-based authentication system', 'endpoints': [ { 'path': '/register', 'method': 'POST', 'description': 'Register a new user', 'parameters': { 'username': 'string (required)', 'password': 'string (required)' } }, { 'path': '/login', 'method': 'POST', 'description': 'Login with username and password', 'parameters': { 'username': 'string (required)', 'password': 'string (required)' } } ], 'security': { 'password_hashing': 'bcrypt', 'token': 'JWT', 'token_expiry': '24 hours' } }

bash
> - 专注于代码生成
> - 包含模板库和最佳实践
> - 自动验证生成的代码
> - 生成配套文档

2. 代码审查子Agent#

示例:代码审查子Agent

用户请求: "创建一个代码审查子Agent" Claude Code 生成的子Agent:

python
`python class CodeReviewAgent: """代码审查子Agent""" def __init__(self, context): self.context = context self.rules = self._load_review_rules() self.patterns = self._load_antipatterns() def review_code(self, code, language='python'): """审查代码""" review = { 'summary': {}, 'issues': [], 'suggestions': [], 'metrics': {} } # 执行静态分析 static_analysis = self._static_analysis(code, language) review['issues'].extend(static_analysis['issues']) # 检查代码模式 pattern_analysis = self._check_patterns(code, language) review['issues'].extend(pattern_analysis['issues']) review['suggestions'].extend(pattern_analysis['suggestions']) # 计算代码指标 metrics = self._calculate_metrics(code, language) review['metrics'] = metrics # 生成总结 review['summary'] = self._generate_summary(review) return review def _static_analysis(self, code, language): """静态分析""" issues = [] # 检查未使用的导入 unused_imports = self._check_unused_imports(code, language) issues.extend(unused_imports) # 检查未使用的变量 unused_variables = self._check_unused_variables(code, language) issues.extend(unused_variables) # 检查代码复杂度 complexity_issues = self._check_complexity(code, language) issues.extend(complexity_issues) # 检查安全问题 security_issues = self._check_security(code, language) issues.extend(security_issues) return {'issues': issues} def _check_unused_imports(self, code, language): """检查未使用的导入""" issues = [] if language == 'python': import ast import re tree = ast.parse(code) # 获取所有导入 imports = set() for node in ast.walk(tree): if isinstance(node, ast.Import): for alias in node.names: imports.add(alias.name.split('.')[0]) elif isinstance(node, ast.ImportFrom): if node.module: imports.add(node.module.split('.')[0]) # 检查是否使用 for imp in imports: if imp not in code[code.find(imp):]: issues.append({ 'type': 'unused_import', 'severity': 'warning', 'message': f"Unused import: {imp}", 'line': self._find_line(code, imp) }) return issues def _check_complexity(self, code, language): """检查代码复杂度""" issues = [] if language == 'python': import ast tree = ast.parse(code) for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): complexity = self._calculate_cyclomatic_complexity(node) if complexity > 10: issues.append({ 'type': 'high_complexity', 'severity': 'warning', 'message': f"Function '{node.name}' has high complexity ({complexity})", 'line': node.lineno, 'complexity': complexity }) return issues def _calculate_cyclomatic_complexity(self, node): """计算圈复杂度""" complexity = 1 for child in ast.walk(node): if isinstance(child, (ast.If, ast.While, ast.For, ast.AsyncFor)): complexity += 1 elif isinstance(child, ast.ExceptHandler): complexity += 1 elif isinstance(child, ast.BoolOp): complexity += len(child.values) - 1 return complexity def _check_security(self, code, language): """检查安全问题""" issues = [] # 检查硬编码密钥 if 'password' in code.lower() and '=' in code: lines = code.split('\n') for i, line in enumerate(lines, 1): if 'password' in line.lower() and '"' in line: issues.append({ 'type': 'security', 'severity': 'error', 'message': 'Possible hardcoded password detected', 'line': i }) # 检查SQL注入风险 if 'execute' in code and '%' in code: lines = code.split('\n') for i, line in enumerate(lines, 1): if 'execute' in line and '%' in line and not 'param' in line.lower(): issues.append({ 'type': 'security', 'severity': 'error', 'message': 'Possible SQL injection vulnerability', 'line': i }) return issues def _check_patterns(self, code, language): """检查代码模式""" issues = [] suggestions = [] # 检查长函数 if language == 'python': import ast tree = ast.parse(code) for node in ast.walk(tree): if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): end_line = node.end_lineno if hasattr(node, 'end_lineno') else node.lineno length = end_line - node.lineno if length > 50: issues.append({ 'type': 'long_function', 'severity': 'warning', 'message': f"Function '{node.name}' is too long ({length} lines)", 'line': node.lineno, 'length': length }) suggestions.append({ 'type': 'refactor', 'message': f"Consider breaking down '{node.name}' into smaller functions", 'line': node.lineno }) return {'issues': issues, 'suggestions': suggestions} def _calculate_metrics(self, code, language): """计算代码指标""" metrics = {} lines = code.split('\n') # 总行数 metrics['total_lines'] = len(lines) # 代码行数(排除空行和注释) code_lines = [line for line in lines if line.strip() and not line.strip().startswith('#')] metrics['code_lines'] = len(code_lines) # 注释行数 comment_lines = [line for line in lines if line.strip().startswith('#')] metrics['comment_lines'] = len(comment_lines) # 空行数 blank_lines = [line for line in lines if not line.strip()] metrics['blank_lines'] = len(blank_lines) # 注释率 metrics['comment_ratio'] = metrics['comment_lines'] / metrics['code_lines'] if metrics['code_lines'] > 0 else 0 return metrics def _generate_summary(self, review): """生成审查总结""" summary = { 'total_issues': len(review['issues']), 'critical_issues': len([i for i in review['issues'] if i['severity'] == 'error']), 'warning_issues': len([i for i in review['issues'] if i['severity'] == 'warning']), 'total_suggestions': len(review['suggestions']), 'code_quality': 'good' } # 评估代码质量 if summary['critical_issues'] > 0: summary['code_quality'] = 'poor' elif summary['warning_issues'] > 5: summary['code_quality'] = 'fair' return summary ```> **子Agent特点**: > - 专注于代码审查 > - 包含审查规则库 > - 检查代码模式和反模式 > - 计算代码指标 ``` ## 子Agent的协作 ### 1. 串行协作 # 示例:子Agent串行协作 用户请求: "使用子Agent完成一个完整的开发任务" Claude Code 生成的协作流程: ````python `python class AgentOrchestrator: """Agent协调器""" def __init__(self, context): self.context = context self.agents = { 'code_generator': CodeGenerationAgent(context), 'code_reviewer': CodeReviewAgent(context), 'test_generator': TestGenerationAgent(context), 'documenter': DocumentationAgent(context) } def execute_task(self, task): """执行任务""" results = {} # 步骤1: 生成代码 print("Step 1: Generating code...") code_result = self.agents['code_generator'].generate_code(task) results['code'] = code_result # 步骤2: 审查代码 print("Step 2: Reviewing code...") review_result = self.agents['code_reviewer'].review_code( code_result['code'], language=task.get('language', 'python') ) results['review'] = review_result # 步骤3: 修复问题 if review_result['summary']['critical_issues'] > 0: print("Step 3: Fixing critical issues...") fixed_code = self._fix_issues( code_result['code'], review_result['issues'] ) results['fixed_code'] = fixed_code else: results['fixed_code'] = code_result['code'] # 步骤4: 生成测试 print("Step 4: Generating tests...") test_result = self.agents['test_generator'].generate_tests( results['fixed_code'], language=task.get('language', 'python') ) results['tests'] = test_result # 步骤5: 生成文档 print("Step 5: Generating documentation...") doc_result = self.agents['documenter'].generate_documentation( results['fixed_code'], test_result['tests'] ) results['documentation'] = doc_result return results def _fix_issues(self, code, issues): """修复代码问题""" fixed_code = code for issue in issues: if issue['type'] == 'security': fixed_code = self._fix_security_issue(fixed_code, issue) elif issue['type'] == 'high_complexity': fixed_code = self._refactor_complex_function(fixed_code, issue) return fixed_code def _fix_security_issue(self, code, issue): """修复安全问题""" # 实现安全修复逻辑 return code def _refactor_complex_function(self, code, issue): """重构复杂函数""" # 实现重构逻辑 return code ```> **协作流程**: 1. 代码生成子Agent生成代码 2. 代码审查子Agent审查代码 3. 修复发现的问题 4. 测试生成子Agent生成测试 5. 文档生成子Agent生成文档 ``` ### 2. 并行协作 # 示例:子Agent并行协作 用户请求: "使用子Agent并行处理多个任务" Claude Code 生成的并行协作流程: ````python `python import concurrent.futures class ParallelAgentOrchestrator: """并行Agent协调器""" def __init__(self, context): self.context = context self.agents = { 'code_generator': CodeGenerationAgent(context), 'code_reviewer': CodeReviewAgent(context), 'test_generator': TestGenerationAgent(context), 'documenter': DocumentationAgent(context) } def execute_parallel_tasks(self, tasks): """并行执行任务""" results = {} with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: # 提交任务 future_to_task = { executor.submit(self.agents['code_generator'].generate_code, task): task for task in tasks } # 收集结果 for future in concurrent.futures.as_completed(future_to_task): task = future_to_task[future] try: result = future.result() results[task['id']] = result except Exception as e: results[task['id']] = {'error': str(e)} return results def execute_parallel_analysis(self, code): """并行分析代码""" results = {} with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: # 提交分析任务 futures = { 'static_analysis': executor.submit( self.agents['code_reviewer']._static_analysis, code, 'python' ), 'pattern_analysis': executor.submit( self.agents['code_reviewer']._check_patterns, code, 'python' ), 'security_analysis': executor.submit( self.agents['code_reviewer']._check_security, code, 'python' ), 'metrics_calculation': executor.submit( self.agents['code_reviewer']._calculate_metrics, code, 'python' ) } # 收集结果 for name, future in futures.items(): try: results[name] = future.result() except Exception as e: results[name] = {'error': str(e)} return results ```> **并行协作优势**: > - 提高任务执行速度 > - 充分利用系统资源 > - 缩短总执行时间 ``` ## 子Agent的优势 ### 1. 提高效率 | 任务类型 | 单Agent | 多子Agent | 效率提升 | |----------|---------|-----------|----------| | 代码生成 + 审查 | 10分钟 | 6分钟 | 40%| | 多文件处理 | 15分钟 | 5分钟 | 67%| | 复杂任务分解 | 20分钟 | 8分钟 | 60%| ### 2. 提高质量 | 指标 | 单Agent | 多子Agent | 改善 | |------|---------|-----------|------| | 代码质量 | 75% | 90% | +20% | | 错误率 | 15% | 5% | 67%| | 测试覆盖率 | 70% | 85% | +21% | ### 3. 提高可维护性 - **模块化**: 每个子Agent独立维护 - **可扩展**: 容易添加新的子Agent - **可复用**: 子Agent可以在不同任务中复用 ## 总结 子Agent概述包括: 1. **子Agent的概念**: 什么是子Agent、子Agent vs 主Agent 2. **子Agent的类型**: 代码生成子Agent、代码审查子Agent 3. **子Agent的协作**: 串行协作、并行协作 4. **子Agent的优势**: 提高效率、提高质量、提高可维护性 通过子Agent机制,Claude Code可以更高效地处理复杂的编程任务。 在下一节中,我们将探讨异步子Agent任务。 ``` ```

标记本节教程为已读

记录您的学习进度,方便后续查看。